Analytics Function Types
Sources
fs-source
This type reads a file on a S3 bucket and outputs its content as a DataFrame.
- id: read
type: fs-source
addr: 192.168.100.237:31318
topics: s3a://testcsv/mycsv.csv
user: minioadmin
password: password
asTable: mytable # (6)!
format: csv # (1)!
schema: |
a INT,
b INT,
c INT,
d STRING
meta:
delimiter: "," # (5)!
encoding: utf-8 # (4)!
csvHeader: true # (3)!
ignore-parse-errors: false
includeFileMetadata: false # (2)!
out:
- process
- The
formatattribute is used to specify the format of files. Allowed formats are :- json
- csv
- parquet
- delta
- raw
- text
- big
- Can optionnaly be added with a
truevalue (falseby default) to include file metadata in the output DataFrame. - When reading csv, KFlow considers by default that the file has headers. If the file does not have headers, set this attribute to
false. - The
encodingattribute is used to specify the encoding of the csv file. Default isutf-8. This value is optional, cannot currently be changed and is shown here for completeness. - The
delimiterattribute is used to specify the delimiter of the csv file. Default is,. - If
asTableis set, the DataFrame is registered as a temporary table in the Spark session. This allows subsquent Tasks to do SQL queries on the DataFrame.
csv format
See above example. If the schema is not given, it is inferred from the file.
json format
Reads a JSON Lines file given in topics. The input is read twice, first to infer the schema, then to read the data.
parquet format
Loads a parquet file. The schema is inferred from the file.
delta format
Loads a Delta Lake file. The schema is inferred from the file.
raw format
Loads a binary file. The output schema is the following :
- path: string
- modificationTime: string
- length: long
- content: binary content of the file
text format
Reads a text file. Each line is a record with a single column value. If the option delimiter:eof is given, then the whole file is read as a single record.
- id: read
type: fs-source
format: text
addr: 192.168.100.237:31318
topics: s3a://test/mytext.text
user: minioadmin
password: password
format: text
schema: |
value STRING
meta:
delimiter: eof
out:
- process
big format
- id: read
type: fs-source
format: big
addr: 192.168.100.237:31318
topics: s3a://test/subdir
user: minioadmin
password: password
schema: |
local_path STRING
meta: # (1)!
ignoreCorruptFiles: false
ignoreMissingFiles: false
pathGlobFilter: "*.png"
recursiveFileLookup: true
out:
- process
- The following options may be used
The topics field point to a list of files which are all downloaded locally. The result is a Data Frame with a local_path column containing the path to the downloaded file.
sim-source
Generates data with the following schema:
| a | b | c |
|---|---|---|
| 0 | 0 | 0 |
| 1 | 1 | 2 |
| ... | ... | ... |
| 99 | 99 | 99 |
The number of generated lines is 100 by default and it can be adjusted with size meta parameter :
- id: read
type: sim-source
schema: | # (1)!
a INT,
b INT,
c INT
meta:
size: 1000 # (2)!
out:
- process
- The schema is optional as it is known and constant.
- Generate 1000 lines of data.
jdbc-source, sql-source, postgresql-source
All these types read data from a PostgreSQL table in a database. For clarity, use the type postgresql-source.
In the following example, adapt host, port, dbname, user, password and table to your configuration. The schema is inferred from the table (or the subquery, if used).
- id: read
type: postgresql-source
addr: jdbc:postgresql://host:port/dbname
topics: table # (1)!
meta:
user: user
password: password
- According to PySpark documentation, the table name can be anything which is valid in a FROM clause of a SQL query.
Processors
query
This type is used to execute a SQL query on a DataFrame. The query is executed on the DataFrame given as input and the result is the output.
- id: process
type: query
sql: |
SELECT * FROM mytable WHERE a > 10
output:
- print
map
This type is used to apply a function to a DataFrame. The function is given inline or as a call to a function in a module (see UDF). The function should take an iterator of DataFrames as input and return an iterator of DataFrames as output.
Here is an example with an inline function.
- id: process
type: map
name: process # (1)!
schema: | # (2)!
a INT,
b INT,
c INT
asTable: mytable # (3)!
fn: |
def process(df_iterator):
for df in df_iterator:
df['c'] = df['a'] + df['b']
yield df[['a', 'b', 'c']]
output:
- print
- name of the python function to call
- output schema
- if
asTableis set, the DataFrame is registered as a temporary table in the Spark session. This allows to use SQL queries on the DataFrame.
Sinks
print
Prints the DataFrame to the console.
print-schema
Prints out on the console the schema of the input in the tree format.
fs-sink
Writes the DataFrame to one (or several) file(s) on a S3 bucket.
- id: write
type: fs-sink
coalesce: 1 # (3)!
format: json | csv | parquet | delta
partitionBy: col # (2)!
topics: s3a://testcsv/mycsv.csv
addr: 192.168.100.237:31318
user: minioadmin
password: password
meta:
# option for all formats
writeMode: overwrite # (1)!
# options for delta format
maxRecordsPerFile: 1000
# options for csv format
csvHeader: true
delimiter: ","
- The
writeModeattribute is used to specify the mode of writing.overwriteis the default mode. Allowed values are :- overwrite
- append
- ignore (ignore this operation if data exists)
- error (exception if data already exists)
- The output is partitioned by the given column name. If specified, the output is laid out on the bucket similar to Hive’s partitioning scheme. This parameter is optional and not available for delta format
- Coalesce partition in this number of partitions. This attribute is optional, and no coalescence will be done if not provided
jdbc-sink, sql-sink, postgresql-sink
All these types write data to a PostgreSQL table in a database. For clarity, use the type postgresql-sink.
In the following example, adapt host, port, dbname, user, password and table to your configuration.
- id: write
type: postgresql-sink
addr: jdbc:postgresql://host:port/dbname
topics: table # (2)!
meta:
writeMode: append # (1)!
user: user
password: password
- The
writeModeattribute is used to specify the mode of writing.appendis the default mode. Allowed values are :- overwrite
- append
- ignore (ignore this operation if data exists)
- error (exception if data already exists)
- The table should exist in the database. If it does not, it is created with the schema of the DataFrame
kafka-sink
Serialize each record of the DataFrame and outputs it to a Kafka topic.
- id: write
type: kafka-sink
topics: topic1
addr: 192.168.2.35:9092,192.168.2.37:9092 # (1)!
- The
addrattribute is used to specify the Kafka bootstrap servers. It is a comma separated list of host:port.
janusgraph-sink
This type allows to implement a custom sink by forking the runtime. It is a no op in the current version of KFlow.